Context
在Vert.x核心组件概览一文中已经谈到了Context的基本概念,在更加深入地了解VertxImpl,Verticle的内部原理等等之前,有必要了解Context的一些内部机制。本文将对Vert.x的Context进行深入分析。
先来看下Context的结构
最顶层的是Context
接口,下面是ContextInternal
接口,这个接口是提供给Vert.x内部使用的API,ContextImpl
是一个抽象类,包含了Context的基本属性以及方法,再往下层就是Context的实现类,其实看到源码可以知道Vert.x中一共有四种Context,但是这个BenchmarkContext的目的不是给用户使用的,它主要被用来对Vert.x一些关键部分进行性能测试,比如Json的编解码、HTTP头部编码、HTTP Handler处理HttpRequest和HttpResponse的速度等等…可以参考这里。就是说,真正在Vert.x中应用的Context就三种。三种Context使用的场景在这里就不再介绍,可以参考对应Verticle的用途。
EventLoopContext
我们先对EventLoopContext进行分析。
isEventLoopContext(),isMultiThreadedWorkerContext()
EventLoopContext本身只有几个方法,isEventLoopContext()
方法和isMultiThreadedWorkerContext()
方法就不用多说了,继承ContextImpl
中的抽象方法,表明自己身份用的。
checkCorrectThread()
先看下checkCorrectThread
方法,这个是实现了ContextImpl
类中抽象方法的方法,会去判断当前线程是不是Vertx线程,如果是Vertx线程还会判断context的线程和当前线程是不是相同线程,看代码
|
|
executeAsync()
接着看executeAsync()
方法,在抽象类ContextImpl
中也声明了executeAsync()
抽象方法
找到实际调用这个方法的地方
ContextImpl
这个类中多次用到了模板方法的设计模式,例如上面的executeAsync()
和checkCorrectThread()
都用到了,将抽象方法写在指定逻辑中,然后交给子类去实现这些抽象方法。
runOnContext()
方法是定义在Context
接口中的,用来指明在当前context中异步地执行指定的操作,这个方法在很多地方都用到了,举例你在Context关联的线程之外想去执行一个action,但是这个action并不是在你当前所在的Vertx线程控制的范围内执行(例如你new Thread去执行),现在你想要的是这个action会改变一些原来Vertx线程内的状态,如果想让这些改变的状态在这个context thread之中可见,就必须使用runOnContext()
方法去执行。
executeAsync()
方法中的逻辑
|
|
ContextImpl#nettyEventLoop()
nettyEventLoop()
方法在ContextImpl
类中,返回持有的EventLoop实例,而这个EventLoop instance除非自己指定,否则是使用在ContextImpl
的Constructor中传入的VertxInternal
对象获取的,通过这个vertx对象的getEventLoopGroup()
方法拿到EventLoopGroup,接着从Netty的EventLoopGroup
中通过next()
方法选出一个EventLoop
作为返回值,注意这里的EventLoopGroup
和EventLoop
是Netty中的概念。
这段是《Netty in Action》中提到的Channel,EventLoop,Thread以及EventLoopGroup之间的关系:
一个EventLoopGroup包含一个或者多个EventLoop
一个EventLoop在它的生命周期内之和一个Thread关联
所有由EventLoop处理的I/O事件都将在它专有的Thread上被处理
一个Channel在它的生命周期内只会注册到一个EventLoop
一个EventLoop可能会指派到一个或多个Channel
在这种设计中,一个给定Channel的I/O操作都是由相同的Thread执行的,实际上消除了对于同步的需要。
这样就从netty的EventLoopGroup
中拿到了EventLoop
,接下来就是用wrapTask()
方法将Vert.x中的要处理的event封装成Runnable接口提交给Netty的EventLoop了。
注:提交的EventLoop不仅仅是NioEventLoop, Vert.x在3.5.0版本加入了对Linux上Epoll和OS X上Kqueue的Native Transport支持,Netty中的异步传输可以不再只是使用NIO了。关于Netty中EventLoop为什么使用
execute()
方法接收Runnable接口,可以去找一找相关的EventLoop和JUC中Executor的资料。
我们对wrapTask()
方法进行简化,省去一些Metrics管理,时间记录,检查操作的逻辑以及错误处理,得到的源码是这样的
可以看到简化后的wrapTask()
方法并没有太复杂的操作,仅仅将executeAsync(Handler<Void> task)
方法传入的参数task封装成一个Runnable
,做一些基本的检查线程的操作,交给hTask.handle(null)
处理就行了。需要注意的只有一点,就是这个Runnable
执行前会将它执行时的线程(必须是Vertx线程)与你调用runOnContext()
方法的context关联。到现在可以确认的是,我们已经知道从Vert.x EventLoopContext上拿到的Task已经能够在Netty的EventLoop上面跑了。
Constructor
EventLoopContext
的构造器调用的是父类的构造器,现在我们将目光转向ContextImpl类。
ContextImpl
ContextImpl这个抽象类,内容非常的多。
Properties
按照顺序逐个分析,Logger不必说,先看六个常量,分别对应三组Key-Value,String常量表示Key,Boolean常量表示Value。THREAD_CHECKS_PROP_NAME
对应THREAD_CHECKS
,负责进行线程检查的开关;DISABLE_TIMINGS_PROP_NAME
对应DISABLE_TIMINGS
,负责进行时间计量的开关;DISABLE_TCCL_PROP_NAME
对应DISABLE_TCCL
,负责TCCL线程上下文类加载器的检测的开关。
VertxInternal
owner
变量很直观,就是对当前的Vertx引用,之前讲过一个Vert.x实例会有多个Eventloops,而每个EventLoop同一时间只会关联一个Context,反过来想一个Context也就对应一个Vert.x实例。这里类型是VertxInternal,主要面向内部使用。
deploymentID, config
然后是deploymentID
和config
两个变量,这两个变量不复杂,找到相关联的地方就知道是给Verticle使用的属性,其中deploymentID
通过generateDeploymentID()
这个方法生成,
在部署Verticle时候作为返回结果,而config
就是部署Verticle时DeploymentOptions
这个对象所包含的config属性的拷贝。前面也讲过了每个Verticle实例都会关联到唯一的一个Context,如果你看一眼AbstractVerticle
里面的deploymentID()
与config()
方法,你会发现它们都调用的是所关联的context的对应方法。
closeHooks
closeHooks提供了提醒自动清理的功能,在Verticle或者Vertx实例关闭的时候,上面跑的组件如果有closeHooks就可以实现自动清理。这个功能主要通过观察者模式实现,有兴趣可以结合源码并参考这篇文章看看。
https://github.com/vietj/vertx-materials/blob/master/src/main/asciidoc/Close_hooks.adoc
tccl
Context所在线程的Thread Context ClassLoader,通过构造器和set方法进行设置。
eventLoop
Context对应的Netty的EventLoop实例,之前在讲EventLoopContext
时候已经提过了。
contextThread
当前Context所关联的VertxThread实例,在ContextImpl
类主要被用来做一些检查操作。
contextData
这个官方文档说的非常明白,contextData就是让你可以在同一个Context的handlers之间共享任意类型的数据,通过put()
,get()
,remove()
方法进行操作,数据结构就是ConcurrentHashMap
。
获取contextData对象的代码
exceptionHandler
为Context注册一个exceptionHandler,如果Context执行的action抛出异常自己无法捕获时,就会使用这个Context本身的exceptionHandler。
可以自己显式地调用exceptionHandler(Handler<Throwable> handler)
方法去给context注册,还可以通过关联的VertxInternal实例中的exceptionHandler注册。
还是ContextImpl
中的wrapTask()
方法,看下怎么处理异常的,把逻辑简化下
如果try块中action自己处理不了异常,那么就先交给context的exceptionHandler来处理。如果这个exceptionHandler为空,就会使用关联的owner的exceptionHandler处理。
deployment
Vert.x中verticle的deploy与undeploy操作都是交给DeploymentManager或者HAManager去处理的,Context中deployment对象就是一个DeploymentImpl
实例,DeploymentImpl
类是DeploymentManager
类中的一个内部类,实现了Deployment
接口,任务就是负责维护与Context相关联的Verticle的状态。ContextImpl
中getInstanceCount()
方法就是调用deployment实例中deploymentOptions()
方法来获取verticle instance数量。
processArgs()方法
从Launcher命令行获取到的参数,如果没有使用Launcher(拿到参数为null)那么就返回Starter的参数。AbstractVerticle
#processArgs()
方法调用的也是context的processArgs()
方法。
注:Starter已经Deprecated,一般都会使用Launcher。
setContext()静态方法
将调用时所在的当前线程与指定Context关联起来。
executeFromIO()方法
先看代码
这个方法接受一个ContextTask参数,ContextTask是一个函数接口。经过线程检查以后,执行的是wrapTask(task, null, true, null).run();
。这个方法当第一个参数不为空时,调用的是cTask.run()
而不是hTask.handle(null)
,也就是说直接在当前线程上执行该task。executeFromIO()
方法什么时候去调用它呢?就是当你需要在IO环境中去执行一些代码,比如说你需要在Netty代码中的ChannelFuture
去处理一些Vert.x的handler回调逻辑时,这个时候就必须有一个对vert.x context的引用,然后用这个context的executeFromIO()
方法去处理你的Vert.x API的逻辑。
多余的一些字段和方法
isOnWorkerThread()
isOnEventLoopThread()
isOnVertxThread()
isWorkerContext()
isEventLoopContext()
isMultiThreadedWorkerContext()
这些方法都能从字面上理解,源码也不复杂,这里就省略了
还有一些非常重要的Stuff
在谈WorkerContext之前,还需要更加深入ContextImpl
,前面讲EventLoopContext
的时候说的比较简略,现在我们dive deeper一些。
Constructor
先回到ContextImpl
类的构造器方法
|
|
getEventLoop()
这个方法之前已经讲过了,我们将目光转向这两个字段internalBlockingPool
跟workerPool
。查看类型发现它们都是WorkerPool,WorkerPool
就是包装了一个ExecutorService
与PoolMetrics
的类,那么它们两个Pool之间有什么区别呢?首先来看一看internalBlockingPool的工作目的,这个pool是给Vert.x内部进行的阻塞操作使用,比如你使用FileSystem的一些操作或者AsyncFile的flush操作时,Vert.x会将这些阻塞的操作交给internalBlockingPool
而不是workerPool
去处理。与之相对的workerPool
就是提供给用户来进行阻塞操作的pool。
TaskQueue
ContextImpl
中还有两个TaskQueue
,这个类的目的很明确
A task queue that always run all tasks in order.
剖析一下TaskQueue
的结构,首先TaskQueue有一个LinkedList<Task>
列表去维护当前TaskQueue队列中需要被执行的任务Task,其中Task
是一个自定义的静态内部类,以及一个Executor
引用表示当前的Executor
,还有一个Runnable引用。TaskQueue的构造方法很简单,就是直接让自己的Runnable引用runner
指向自己run
方法。
|
|
调用队列的execute()
方法可以执行一个新的任务
先对队列进行同步操作,将当前task封装成Task
对象加入tasks队列。判断当前任务是否执行完了,如果之前的任务执行完了才去执行当前这个任务,否则execute()
只会将任务加到队列,然后退出循环,这样就保证了同一个队列TaskQueue
内的任务是一个个执行的,如果任务发送过多而前面没有处理完,就会出现任务堆积。
接着是run
方法
先进入死循环,声明当前即将被执行的task
, 接着对tasks进行同步避免冲突操作,通过LinkedList
的poll()
从队列头取出一个任务交给引用task
。
这个时候会有一些条件的判断,如果取出的是null,说明队列中没有任务需要执行了,这个时候将current
置为null,并且退出循环,这样下次进入循环的时候就能执行任务了。
如果取到任务后发现当前取到的任务关联的executor不是当前的TaskQueue
中的current executor,那么将这个任务重新退回队列第一个,并且将runner
加入与这个任务相关联的executor(执行会在稍后发生,这里不一定会立即执行),并且将TaskQueue
的current置为该executor,这个过程做完就退出循环。
如果之前两个条件都满足了,说明这个任务现在是可以执行的,那么直接使用task.runnable.run();
执行阻塞操作。这里用一个流程图来看更直观一点。
现在我们已经知道如何通过TaskQueue
来保障阻塞操作执行的顺序。接下来看executeBlocking()
方法
executeBlocking()方法
在ContextImpl
类中一共有5个executeBlocking()
重载方法,一个一个来看他们的作用。
先看第一个方法代码
这个代码已经有一行注释了,上面清楚地写着是按顺序执行一个internal的阻塞task。将当前context的internalBlockingPool executor与internal队列传入重载的executeBlocking()
方法,现在来看这个重载方法的代码,所有的逻辑都是在这里完成的,省去Metrics和Timing部分
首先声明一个Runnable
对象command对即将进行的阻塞操作进行封装,由于传入blockingCodeHandler
为null,因此直接执行action,将结果告知给前面定义的Future中并且设置好resultHandler回调,最后就是将这个command
交给internal队列去处理了,queue.execute(command, exec);
使得在交给队列之前将这个任务与当前context下的internalBlockingPool
的executor
关联。
接下来就是这个重载方法
这个方法就是有序直接执行阻塞任务。继续下一层重载方法
这个方法会去指定执行的executor是workerPool
中的executor,并且如果有序的话就用orderedTasks去保证执行任务的顺序,继续回到上面实现真正逻辑的重载方法。
这次blockingCodeHandler
不为空,那么就将当前所在线程的context设置为正在用的context,并且将异步操作的结果通过future与resultHandler回调hook上。如果有序就用taskQueue保证任务的执行顺序,无序就直接丢给对应的executor执行。
还有最后一个方法是自己指定TaskQueue
的重载方法,逻辑跟有序执行基本差不多,只不过TaskQueue
是自己指定的,这里就省略了。
在理清了ContextImpl
的一切之后,接下来看WorkerContext
与MultiThreadedWorkerContext
就没有那么复杂了。
WorkerContext
isEventLoopContext()
方法与isMultiThreadedWorkerContext()
方法当然都返回false。接下来就是executeAsync()
方法
可以看到仅仅是将task封装成Runnable对象然后提交到TaskQueue中去执行。
MultiThreadedWorkerContext
MultiThreadedWorkerContext
继承了WorkerContext
,不同之处就是isMultiThreadedWorkerContext()
方法与executeAsync()
方法进行了重写。
前者不用说,直接看executeAsync()
在MultiThreadedWorkerContext
中,executeAsync()
方法会将task直接交给workerPool的executorService去处理。
小结
一个线程可能会被多个Context所使用,但是一个Context只可能同时在一个线程执行,不可能出现一个Context同时关联多个线程的情况。
调用Vert.x的API时候,如果当前线程是Vert.x线程,那么此时就会复用这个Vert.x线程已经关联好的Context,如果不是就会创建一个新的Context。
一个Verticle 部署以后会关联一个新的Context,并且只对应这个Context,Context类型会由
DeploymentOptions
决定, 就是之前讲的三种类型。所有在Verticle里面执行的handler都会用Verticle关联的唯一的Context。另外,如果在Standard Verticle里面调用executeBlocking()
方法执行阻塞代码,尽管任务被丢给worker pool里面的一个线程去执行,但Context这个时候还是EventLoopContext
类型。